# -*- coding: utf-8 -*-
from datetime import timedelta
from utils.operators.spark_submit import SparkSubmitOperator
from jms.time_sensor import time_after_03_00

jdbcUrl='{{ var.json.oracle_bigdata_19c50.url }}'
username='{{ var.json.oracle_bigdata_19c50.username }}'
password='{{ var.json.oracle_bigdata_19c50.password }}'
# jdbcUrl='jdbc:oracle:thin:@pro-push02-readwrite-ora.yl.com:1521:orcl'
# username='jms_push'
# password='Push_JTdb_Yunlu'
nowdt='{{ execution_date | cst_ds }}'
nextdt='{{ execution_date | date_add(1) | cst_ds }}'
table = "yl_oms_oms_waybill"
env='{{ var.value.env_sync }}'

jsonpara = """{
"reader":{
"connect":{
"url":"jdbcUrlpara",
"username":"usernamepara",
"password":"passwordpara",
"driver":"oracle.jdbc.driver.OracleDriver"
},
"dbtype":"oracle",
"tableName":"tablepara",
"indexName":"YL_OMS_OMS_WAYBILL_INDX4",
"where":"
(input_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and input_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss')) ;

(input_time>=to_date('nowdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-1  and input_time<to_date('nowdt 14:00:00', 'yyyy-mm-dd hh24:mi:ss')-1) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 14:00:00', 'yyyy-mm-dd hh24:mi:ss')-1  and input_time<to_date('nowdt 16:00:00', 'yyyy-mm-dd hh24:mi:ss')-1) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 16:00:00', 'yyyy-mm-dd hh24:mi:ss')-1  and input_time<to_date('nowdt 17:00:00', 'yyyy-mm-dd hh24:mi:ss')-1) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 17:00:00', 'yyyy-mm-dd hh24:mi:ss')-1  and input_time<to_date('nowdt 18:00:00', 'yyyy-mm-dd hh24:mi:ss')-1) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 18:00:00', 'yyyy-mm-dd hh24:mi:ss')-1  and input_time<to_date('nowdt 19:00:00', 'yyyy-mm-dd hh24:mi:ss')-1) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 19:00:00', 'yyyy-mm-dd hh24:mi:ss')-1  and input_time<to_date('nowdt 20:00:00', 'yyyy-mm-dd hh24:mi:ss')-1) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 20:00:00', 'yyyy-mm-dd hh24:mi:ss')-1  and input_time<to_date('nowdt 22:00:00', 'yyyy-mm-dd hh24:mi:ss')-1) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 22:00:00', 'yyyy-mm-dd hh24:mi:ss')-1 and input_time<to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-1) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-2  and input_time<to_date('nowdt 14:00:00', 'yyyy-mm-dd hh24:mi:ss')-2) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 14:00:00', 'yyyy-mm-dd hh24:mi:ss')-2  and input_time<to_date('nowdt 16:00:00', 'yyyy-mm-dd hh24:mi:ss')-2) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 16:00:00', 'yyyy-mm-dd hh24:mi:ss')-2  and input_time<to_date('nowdt 17:00:00', 'yyyy-mm-dd hh24:mi:ss')-2) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 17:00:00', 'yyyy-mm-dd hh24:mi:ss')-2  and input_time<to_date('nowdt 18:00:00', 'yyyy-mm-dd hh24:mi:ss')-2) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 18:00:00', 'yyyy-mm-dd hh24:mi:ss')-2  and input_time<to_date('nowdt 19:00:00', 'yyyy-mm-dd hh24:mi:ss')-2) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 19:00:00', 'yyyy-mm-dd hh24:mi:ss')-2  and input_time<to_date('nowdt 20:00:00', 'yyyy-mm-dd hh24:mi:ss')-2) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 20:00:00', 'yyyy-mm-dd hh24:mi:ss')-2  and input_time<to_date('nowdt 22:00:00', 'yyyy-mm-dd hh24:mi:ss')-2) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 22:00:00', 'yyyy-mm-dd hh24:mi:ss')-2 and input_time<to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-2) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-3  and input_time<to_date('nowdt 14:00:00', 'yyyy-mm-dd hh24:mi:ss')-3)  and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 14:00:00', 'yyyy-mm-dd hh24:mi:ss')-3  and input_time<to_date('nowdt 16:00:00', 'yyyy-mm-dd hh24:mi:ss')-3)  and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 16:00:00', 'yyyy-mm-dd hh24:mi:ss')-3  and input_time<to_date('nowdt 18:00:00', 'yyyy-mm-dd hh24:mi:ss')-3)  and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 18:00:00', 'yyyy-mm-dd hh24:mi:ss')-3  and input_time<to_date('nowdt 20:00:00', 'yyyy-mm-dd hh24:mi:ss')-3)  and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 20:00:00', 'yyyy-mm-dd hh24:mi:ss')-3  and input_time<to_date('nowdt 22:00:00', 'yyyy-mm-dd hh24:mi:ss')-3)  and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 22:00:00', 'yyyy-mm-dd hh24:mi:ss')-3 and input_time<to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-3)  and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-4  and input_time<to_date('nowdt 14:00:00', 'yyyy-mm-dd hh24:mi:ss')-4)  and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 14:00:00', 'yyyy-mm-dd hh24:mi:ss')-4  and input_time<to_date('nowdt 16:00:00', 'yyyy-mm-dd hh24:mi:ss')-4)  and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 16:00:00', 'yyyy-mm-dd hh24:mi:ss')-4  and input_time<to_date('nowdt 18:00:00', 'yyyy-mm-dd hh24:mi:ss')-4)  and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 18:00:00', 'yyyy-mm-dd hh24:mi:ss')-4  and input_time<to_date('nowdt 20:00:00', 'yyyy-mm-dd hh24:mi:ss')-4)  and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 20:00:00', 'yyyy-mm-dd hh24:mi:ss')-4  and input_time<to_date('nowdt 22:00:00', 'yyyy-mm-dd hh24:mi:ss')-4)  and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 22:00:00', 'yyyy-mm-dd hh24:mi:ss')-4 and input_time<to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-4)  and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-5  and input_time<to_date('nowdt 18:00:00', 'yyyy-mm-dd hh24:mi:ss')-5)  and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 18:00:00', 'yyyy-mm-dd hh24:mi:ss')-5 and input_time<to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-5)  and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-6 and input_time<to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-6) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-7 and input_time<to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-7) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-8 and input_time<to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-8) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-9 and input_time<to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-9) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-10 and input_time<to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-10) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-12 and input_time<to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-11) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-14 and input_time<to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-13) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-17 and input_time<to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-15) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-20 and input_time<to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-18) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-23 and input_time<to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-21) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-26 and input_time<to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-24) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-31 and input_time<to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-27) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-36 and input_time<to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-32) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-42 and input_time<to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-37) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time>=to_date('nowdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-51 and input_time<to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-43) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
(input_time<to_date('nowdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss')-51) and last_update_time>=to_date('nextdt 00:00:00', 'yyyy-mm-dd hh24:mi:ss') and last_update_time<to_date('nextdt 03:00:00', 'yyyy-mm-dd hh24:mi:ss');
",
"query":"",
"splitColumn":"",
"equalitySectioning":0,
"containsnull":0,
"fetchsize":"1024",
"threadNumber":-1
},
"channel":{
"filterAbnormalCharacter":1
},
"writer":{
"dbtype":"hive",
"tableName":"yl_oms_oms_waybill_hi",
"database":"jms_ods",
"writeMode": "overwrite",
"partitionColumn":"dt",
"partitionValue":"nextdt"},
"settting":{
"env":"envpara"}
}""".replace("jdbcUrlpara", jdbcUrl).replace("usernamepara", username).replace("passwordpara", password). \
    replace("nowdt", nowdt).replace("nextdt", nextdt). \
    replace("tablepara", table). \
    replace("envpara", env)

# from airflow.operators.dummy_operator import DummyOperator
# jms_ods__yl_oms_oms_waybill__0300 = DummyOperator(
#     task_id='jms_ods__yl_oms_oms_waybill__0300',
#     email='songjun@yl-scm.com',
#     retries=0,
#     priority_weight=0,
#     # sla=timedelta(hours=2),
#     on_success_callback=yl_threeSegCodeOnSuccess(kwargs(), dingding_conn_id="dingding_ThreeSeg_etl_info"),
#     on_failure_callback=yl_threeSegCodeOnFailure(kwargs(), dingding_conn_id="dingding_ThreeSeg_etl_alert"),
# )

jms_ods__yl_oms_oms_waybill__0300 = SparkSubmitOperator(
    task_id='jms_ods__yl_oms_oms_waybill__0300',
    name='jms_ods__yl_oms_oms_waybill__0300_{{ execution_date | date_add(1) |  cst_ds }}',
    email=['yl_etl@yl-scm.com','yl_bigdata@yl-scm.com'],
    pool='oracle_tab',
    pool_slots=12,
    execution_timeout=timedelta(hours=1),
    driver_memory='4G',
    executor_memory='4G',
    executor_cores=2,
    num_executors=2,
    conf={'spark.dynamicAllocation.enabled'                  : 'true',
          'spark.shuffle.service.enabled'                    : 'true',
          'spark.dynamicAllocation.maxExecutors'             : 13,
          'spark.dynamicAllocation.cachedExecutorIdleTimeout': 30,
          'spark.sql.sources.partitionOverwriteMode'         : 'dynamic',
          'spark.executor.memoryOverhead'                    : '1G',
          },
    java_class='com.yunlu.bigdata.jobs.synchrotool.DataSynchDriver',  # spark 主类
    application='hdfs:///scheduler/jms/spark/rgf/spark_sync.jar',  # spark jar 包
    application_args=[jsonpara, ],
)

jms_ods__yl_oms_oms_waybill__0300 << time_after_03_00